package com.hanxiaozhang.curator.source;

import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.curator.RetryLoop;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.LockInternalsDriver;
import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
import org.apache.curator.framework.recipes.locks.PredicateResults;
import org.apache.curator.utils.PathUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 〈一句话功能简述〉<br>
 * 〈锁的内部实现〉
 *
 * @author hanxinghua
 * @create 2022/8/31
 * @since 1.0.0
 */
public class MyLockInternals {

    /**
     * 客户端
     */
    private final CuratorFramework client;


    private final String path;

    /**
     * 基础路径
     */
    private final String basePath;

    /**
     * lock驱动实现分布式锁
     */
    private final LockInternalsDriver driver;

    /**
     * 锁名称
     */
    private final String lockName;


    /**
     * 可撤销（原子类保证，线程安全）
     */
    private final AtomicReference<MyRevocationSpec> revocable = new AtomicReference<MyRevocationSpec>(null);

    /**
     * 可撤销的Watcher
     */
    private final CuratorWatcher revocableWatcher = new CuratorWatcher() {
        @Override
        public void process(WatchedEvent event) throws Exception {
            if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
                checkRevocableWatcher(event.getPath());
            }
        }
    };

    /**
     * XX的watcher
     */
    private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            notifyFromWatcher();
        }
    };

    /**
     * 代表可成功加锁的数量
     */
    private volatile int maxLeases;

    /**
     * 撤销消息
     */
    static final byte[] REVOKE_MESSAGE = "__REVOKE__".getBytes();

    /**
     * 尝试删除锁定节点，以便重置序列号
     *
     * @throws Exception errors
     */
    public void clean() throws Exception {
        try {
            client.delete().forPath(basePath);
        } catch (KeeperException.BadVersionException ignore) {
            // ignore - another thread/process got the lock
        } catch (KeeperException.NotEmptyException ignore) {
            // ignore - other threads/processes are waiting
        }
    }

    /**
     * 构造函数
     *
     * @param client
     * @param driver
     * @param path
     * @param lockName
     * @param maxLeases
     */
    MyLockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
        this.driver = driver;
        this.lockName = lockName;
        this.maxLeases = maxLeases;

        this.client = client;
        this.basePath = PathUtils.validatePath(path);
        this.path = ZKPaths.makePath(path, lockName);
    }

    /**
     * 设置可成功加锁的数量
     *
     * @param maxLeases
     */
    synchronized void setMaxLeases(int maxLeases) {
        this.maxLeases = maxLeases;
        notifyAll();
    }

    /**
     * 使可撤销
     *
     * @param entry
     */
    void makeRevocable(MyRevocationSpec entry) {
        revocable.set(entry);
    }

    /**
     * 释放锁
     *
     * @param lockPath
     * @throws Exception
     */
    void releaseLock(String lockPath) throws Exception {
        // 设置为空
        revocable.set(null);
        // 删除路径
        deleteOurPath(lockPath);
    }

    /**
     * 获取客户端
     *
     * @return
     */
    CuratorFramework getClient() {
        return client;
    }

    /**
     * 获取参与者节点的路径
     *
     * @param client
     * @param basePath
     * @param lockName
     * @param sorter
     * @return
     * @throws Exception
     */
    public static Collection<String> getParticipantNodes(CuratorFramework client, final String basePath, String lockName, LockInternalsSorter sorter) throws Exception {
        // 获取排序后的所有孩子节点的路径
        List<String> names = getSortedChildren(client, basePath, lockName, sorter);
        // 遍历拼接完整路径：基础路径 + 孩子节点的路径
        Iterable<String> transformed = Iterables.transform
                (names, new Function<String, String>() {
                            @Override
                            public String apply(String name) {
                                return ZKPaths.makePath(basePath, name);
                            }
                        }
                );
        return ImmutableList.copyOf(transformed);
    }


    /**
     * 获取排序的孩子节点，从小到大排序
     *
     * @param client
     * @param basePath
     * @param lockName
     * @param sorter
     * @return
     * @throws Exception
     */
    public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {

        // 获取所有孩子节点路径
        List<String> children = client.getChildren().forPath(basePath);

        // 复制到新集合中
        List<String> sortedList = Lists.newArrayList(children);

        // 排序
        Collections.sort
                (
                        sortedList,
                        new Comparator<String>() {
                            @Override
                            public int compare(String lhs, String rhs) {
                                return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
                            }
                        }
                );

        return sortedList;
    }

    public static List<String> getSortedChildren(final String lockName, final LockInternalsSorter sorter, List<String> children) {
        List<String> sortedList = Lists.newArrayList(children);
        Collections.sort
                (
                        sortedList,
                        new Comparator<String>() {
                            @Override
                            public int compare(String lhs, String rhs) {
                                return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
                            }
                        }
                );
        return sortedList;
    }

    List<String> getSortedChildren() throws Exception {
        return getSortedChildren(client, basePath, lockName, driver);
    }


    /**
     * 获取锁名称
     *
     * @return
     */
    String getLockName() {
        return lockName;
    }


    /**
     * 获取驱动
     *
     * @return
     */
    LockInternalsDriver getDriver() {
        return driver;
    }

    /**
     * 尝试加锁
     *
     * @param time
     * @param unit
     * @param lockNodeBytes
     * @return
     * @throws Exception
     */
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {

        // 开始时间
        final long startMillis = System.currentTimeMillis();
        // 超时时间，转换成毫秒
        final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
        // 当前锁节点数据
        final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        // 重试次数
        int retryCount = 0;
        // 加锁后临时有序节点的路径
        String ourPath = null;
        // 是否持有锁
        boolean hasTheLock = false;
        // 是否处理完成
        boolean isDone = false;
        // 循环
        while (!isDone) {
            isDone = true;
            try {
                // 在path下创建一个临时有序节点
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                // 抢锁，并判断是否拥有锁
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (KeeperException.NoNodeException e) {
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node this can happen when the session expires,etc.
                //  So, if the retry allows, just try it all again
                // 当StandardLockInternalsDriver找不到锁定节点时会抛出，这可能会在会话到期时发生
                // 因此，如果重试允许，请再试一次
                if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                    isDone = false;
                } else {
                    throw e;
                }
            }
        }
        // 持有锁，返回路径
        if (hasTheLock) {
            return ourPath;
        }
        return null;
    }

    /**
     * 检查可撤销锁的Watcher
     *
     * @param path
     * @throws Exception
     */
    private void checkRevocableWatcher(String path) throws Exception {
        MyRevocationSpec entry = revocable.get();
        if (entry != null) {
            try {
                byte[] bytes = client.getData().usingWatcher(revocableWatcher).forPath(path);
                if (Arrays.equals(bytes, REVOKE_MESSAGE)) {
                    entry.getExecutor().execute(entry.getRunnable());
                }
            } catch (KeeperException.NoNodeException ignore) {
                // ignore
            }
        }
    }

    /**
     * 判断是否拥有锁
     *
     * @param startMillis
     * @param millisToWait
     * @param ourPath
     * @return
     * @throws Exception
     */
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {

        // 是否持有锁
        boolean haveTheLock = false;
        // 是否删除
        boolean doDelete = false;

        try {
            // 可撤销不为空，创建Watcher
            if (revocable.get() != null) {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
            // 循环
            while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
                // 获取排序后孩子节点的路径
                List<String> children = getSortedChildren();
                // 获取当前线程创建的临时节点路径
                String sequenceNodeName = ourPath.substring(basePath.length() + 1);
                // 判断当前节点编号是否 < maxLease，若是则抢到了锁
                PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                // 获取到锁
                if (predicateResults.getsTheLock()) {
                    haveTheLock = true;
                }
                // 没有获取到锁
                else {
                    // 前一个节点编号较小的节点的路径
                    String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                    // 加锁
                    synchronized (this) {
                        try {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            // 如果没抢到锁，监听前一个节点事件
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            // 设置超时时间，判断是否超时
                            if (millisToWait != null) {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                //如果超时，直接退出，并标记
                                if (millisToWait <= 0) {
                                    doDelete = true;
                                    break;
                                }
                                // 设置等待时间等待
                                wait(millisToWait);
                            } else {
                                wait();
                            }
                        } catch (KeeperException.NoNodeException e) {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        } catch (Exception e) {
            doDelete = true;
            throw e;
        } finally {
            // 如果标记了删除，删除节点数据
            if (doDelete) {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

    /**
     * 删除指定路径
     *
     * @param ourPath
     * @throws Exception
     */
    private void deleteOurPath(String ourPath) throws Exception {
        try {
            client.delete().guaranteed().forPath(ourPath);
        } catch (KeeperException.NoNodeException e) {
            // ignore - already deleted (possibly expired session, etc.)
        }
    }

    /**
     * 通知所有观察者
     */
    private synchronized void notifyFromWatcher() {
        notifyAll();
    }

}

